home *** CD-ROM | disk | FTP | other *** search
/ PC World Komputer 2010 April / PCWorld0410.iso / hity wydania / Ubuntu 9.10 PL / karmelkowy-koliberek-desktop-9.10-i386-PL.iso / casper / filesystem.squashfs / usr / share / system-config-printer / troubleshoot / PrintTestPage.py < prev    next >
Text File  |  2009-10-19  |  20KB  |  522 lines

  1. #!/usr/bin/env python
  2.  
  3. ## Printing troubleshooter
  4.  
  5. ## Copyright (C) 2008, 2009 Red Hat, Inc.
  6. ## Copyright (C) 2008, 2009 Tim Waugh <twaugh@redhat.com>
  7.  
  8. ## This program is free software; you can redistribute it and/or modify
  9. ## it under the terms of the GNU General Public License as published by
  10. ## the Free Software Foundation; either version 2 of the License, or
  11. ## (at your option) any later version.
  12.  
  13. ## This program is distributed in the hope that it will be useful,
  14. ## but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  16. ## GNU General Public License for more details.
  17.  
  18. ## You should have received a copy of the GNU General Public License
  19. ## along with this program; if not, write to the Free Software
  20. ## Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
  21.  
  22. import cups
  23. import dbus
  24. import dbus.glib
  25. import gobject
  26. import os
  27. import pango
  28. import tempfile
  29. import time
  30. from timedops import TimedOperation, OperationCanceled
  31.  
  32. from base import *
  33.  
  34. import errordialogs
  35. errordialogs.set_gettext_function (_)
  36. from errordialogs import *
  37.  
  38. DBUS_PATH="/com/redhat/PrinterSpooler"
  39. DBUS_IFACE="com.redhat.PrinterSpooler"
  40. class PrintTestPage(Question):
  41.     STATE = { cups.IPP_JOB_PENDING: _("Pending"),
  42.               cups.IPP_JOB_HELD: _("Held"),
  43.               cups.IPP_JOB_PROCESSING: _("Processing"),
  44.               cups.IPP_JOB_STOPPED: _("Stopped"),
  45.               cups.IPP_JOB_CANCELED: _("Canceled"),
  46.               cups.IPP_JOB_ABORTED: _("Aborted"),
  47.               cups.IPP_JOB_COMPLETED: _("Completed") }
  48.  
  49.     def __init__ (self, troubleshooter):
  50.         Question.__init__ (self, troubleshooter, "Print test page")
  51.         page = gtk.VBox ()
  52.         page.set_spacing (12)
  53.         page.set_border_width (12)
  54.  
  55.         label = gtk.Label ()
  56.         label.set_alignment (0, 0)
  57.         label.set_use_markup (True)
  58.         label.set_line_wrap (True)
  59.         page.pack_start (label, False, False, 0)
  60.         self.main_label = label
  61.         self.main_label_text = ('<span weight="bold" size="larger">' +
  62.                                 _("Test Page") + '</span>\n\n' +
  63.                                 _("Now print a test page.  If you are having "
  64.                                   "problems printing a specific document, "
  65.                                   "print that document now and mark the print "
  66.                                   "job below."))
  67.  
  68.         hbox = gtk.HButtonBox ()
  69.         hbox.set_border_width (0)
  70.         hbox.set_spacing (3)
  71.         hbox.set_layout (gtk.BUTTONBOX_START)
  72.         self.print_button = gtk.Button (_("Print Test Page"))
  73.         hbox.pack_start (self.print_button, False, False, 0)
  74.  
  75.         self.cancel_button = gtk.Button (_("Cancel All Jobs"))
  76.         hbox.pack_start (self.cancel_button, False, False, 0)
  77.         page.pack_start (hbox, False, False, 0)
  78.  
  79.         tv = gtk.TreeView ()
  80.         test_cell = gtk.CellRendererToggle ()
  81.         test = gtk.TreeViewColumn (_("Test"), test_cell, active=0)
  82.         job = gtk.TreeViewColumn (_("Job"), gtk.CellRendererText (), text=1)
  83.         printer_cell = gtk.CellRendererText ()
  84.         printer = gtk.TreeViewColumn (_("Printer"), printer_cell, text=2)
  85.         name_cell = gtk.CellRendererText ()
  86.         name = gtk.TreeViewColumn (_("Document"), name_cell, text=3)
  87.         status = gtk.TreeViewColumn (_("Status"), gtk.CellRendererText (),
  88.                                      text=4)
  89.         test_cell.set_radio (False)
  90.         self.test_cell = test_cell
  91.         printer.set_resizable (True)
  92.         printer_cell.set_property ("ellipsize", pango.ELLIPSIZE_END)
  93.         printer_cell.set_property ("width-chars", 20)
  94.         name.set_resizable (True)
  95.         name_cell.set_property ("ellipsize", pango.ELLIPSIZE_END)
  96.         name_cell.set_property ("width-chars", 20)
  97.         status.set_resizable (True)
  98.         tv.append_column (test)
  99.         tv.append_column (job)
  100.         tv.append_column (printer)
  101.         tv.append_column (name)
  102.         tv.append_column (status)
  103.         tv.set_rules_hint (True)
  104.         sw = gtk.ScrolledWindow ()
  105.         sw.set_policy (gtk.POLICY_AUTOMATIC, gtk.POLICY_AUTOMATIC)
  106.         sw.set_shadow_type (gtk.SHADOW_IN)
  107.         sw.add (tv)
  108.         self.treeview = tv
  109.         page.pack_start (sw)
  110.  
  111.         label = gtk.Label (_("Did the marked print jobs print correctly?"))
  112.         label.set_line_wrap (True)
  113.         label.set_alignment (0, 0)
  114.         page.pack_start (label, False, False, 0)
  115.  
  116.         vbox = gtk.VBox ()
  117.         vbox.set_spacing (6)
  118.         self.yes = gtk.RadioButton (label=_("Yes"))
  119.         no = gtk.RadioButton (label=_("No"))
  120.         no.set_group (self.yes)
  121.         vbox.pack_start (self.yes, False, False, 0)
  122.         vbox.pack_start (no, False, False, 0)
  123.         page.pack_start (vbox, False, False, 0)
  124.         self.persistent_answers = {}
  125.         troubleshooter.new_page (page, self)
  126.  
  127.     def display (self):
  128.         answers = self.troubleshooter.answers
  129.         if not answers.has_key ('cups_queue'):
  130.             return False
  131.  
  132.         parent = self.troubleshooter.get_window ()
  133.         self.authconn = answers['_authenticated_connection']
  134.         mediatype = None
  135.         defaults = answers.get ('cups_printer_ppd_defaults', {})
  136.         for opts in defaults.values ():
  137.             for opt, value in opts.iteritems ():
  138.                 if opt == "MediaType":
  139.                     mediatype = value
  140.                     break
  141.  
  142.         if mediatype != None:
  143.             mediatype_string = '\n\n' + (_("Remember to load paper of type "
  144.                                            "'%s' into the printer first.") %
  145.                                          mediatype)
  146.         else:
  147.             mediatype_string = ""
  148.  
  149.         label_text = self.main_label_text + mediatype_string
  150.         self.main_label.set_markup (label_text)
  151.  
  152.         model = gtk.ListStore (gobject.TYPE_BOOLEAN,
  153.                                gobject.TYPE_INT,
  154.                                gobject.TYPE_STRING,
  155.                                gobject.TYPE_STRING,
  156.                                gobject.TYPE_STRING)
  157.         self.treeview.set_model (model)
  158.         self.job_to_iter = {}
  159.  
  160.         test_jobs = self.persistent_answers.get ('test_page_job_id', [])
  161.         def get_jobs ():
  162.             c = self.authconn
  163.             jobs_dict = c.getJobs (which_jobs='not-completed',
  164.                                    my_jobs=False)
  165.             completed_jobs_dict = c.getJobs (which_jobs='completed')
  166.             return (jobs_dict, completed_jobs_dict)
  167.  
  168.         self.op = TimedOperation (get_jobs, parent=parent)
  169.         try:
  170.             (jobs_dict, completed_jobs_dict) = self.op.run ()
  171.         except OperationCanceled:
  172.             return False
  173.  
  174.         # We want to display the jobs in the queue for this printer...
  175.         try:
  176.             queue_uri_ending = "/" + self.troubleshooter.answers['cups_queue']
  177.             jobs_on_this_printer = filter (lambda x:
  178.                                                jobs_dict[x]['job-printer-uri'].\
  179.                                                endswith (queue_uri_ending),
  180.                                            jobs_dict.keys ())
  181.         except:
  182.             jobs_on_this_printer = []
  183.  
  184.         # ...as well as any other jobs we've previous submitted as test pages.
  185.         jobs = list (set(test_jobs).union (set (jobs_on_this_printer)))
  186.  
  187.         completed_jobs_dict = None
  188.         for job in jobs:
  189.             try:
  190.                 j = jobs_dict[job]
  191.             except KeyError:
  192.                 try:
  193.                     j = completed_jobs_dict[job]
  194.                 except KeyError:
  195.                     continue
  196.  
  197.             iter = model.append (None)
  198.             self.job_to_iter[job] = iter
  199.             model.set_value (iter, 0, job in test_jobs)
  200.             model.set_value (iter, 1, job)
  201.             self.update_job (job, j)
  202.  
  203.         return True
  204.  
  205.     def connect_signals (self, handler):
  206.         self.print_sigid = self.print_button.connect ("clicked",
  207.                                                       self.print_clicked)
  208.         self.cancel_sigid = self.cancel_button.connect ("clicked",
  209.                                                         self.cancel_clicked)
  210.         self.test_sigid = self.test_cell.connect ('toggled',
  211.                                                   self.test_toggled)
  212.  
  213.         def create_subscription ():
  214.             c = self.authconn
  215.             sub_id = c.createSubscription ("/",
  216.                                            events=["job-created",
  217.                                                    "job-completed",
  218.                                                    "job-stopped",
  219.                                                    "job-progress",
  220.                                                    "job-state-changed"])
  221.             return sub_id
  222.  
  223.         parent = self.troubleshooter.get_window ()
  224.         self.op = TimedOperation (create_subscription, parent=parent)
  225.         try:
  226.             self.sub_id = self.op.run ()
  227.         except OperationCanceled:
  228.             pass
  229.  
  230.         try:
  231.             bus = dbus.SystemBus ()
  232.         except:
  233.             bus = None
  234.  
  235.         self.bus = bus
  236.         if bus:
  237.             bus.add_signal_receiver (self.handle_dbus_signal,
  238.                                      path=DBUS_PATH,
  239.                                      dbus_interface=DBUS_IFACE)
  240.  
  241.         self.timer = gobject.timeout_add (1000, self.update_jobs_list)
  242.  
  243.     def disconnect_signals (self):
  244.         if self.bus:
  245.             self.bus.remove_signal_receiver (self.handle_dbus_signal,
  246.                                              path=DBUS_PATH,
  247.                                              dbus_interface=DBUS_IFACE)
  248.                                              
  249.         self.print_button.disconnect (self.print_sigid)
  250.         self.cancel_button.disconnect (self.cancel_sigid)
  251.         self.test_cell.disconnect (self.test_sigid)
  252.  
  253.         def cancel_subscription (sub_id):
  254.             c = self.authconn
  255.             c.cancelSubscription (sub_id)
  256.  
  257.         parent = self.troubleshooter.get_window ()
  258.         self.op = TimedOperation (cancel_subscription,
  259.                                   (self.sub_id,),
  260.                                   parent=parent)
  261.         try:
  262.             self.op.run ()
  263.         except OperationCanceled:
  264.             pass
  265.  
  266.         try:
  267.             del self.sub_seq
  268.         except:
  269.             pass
  270.  
  271.         gobject.source_remove (self.timer)
  272.  
  273.     def collect_answer (self):
  274.         if not self.displayed:
  275.             return {}
  276.  
  277.         self.answers = self.persistent_answers.copy ()
  278.         parent = self.troubleshooter.get_window ()
  279.         success = self.yes.get_active ()
  280.         self.answers['test_page_successful'] = success
  281.  
  282.         class collect_jobs:
  283.             def __init__ (self, model):
  284.                 self.jobs = []
  285.                 model.foreach (self.each, None)
  286.  
  287.             def each (self, model, path, iter, user_data):
  288.                 self.jobs.append (model.get (iter, 0, 1, 2, 3, 4))
  289.  
  290.         model = self.treeview.get_model ()
  291.         jobs = collect_jobs (model).jobs
  292.         def collect_attributes (jobs):
  293.             job_attrs = None
  294.             c = self.authconn
  295.             with_attrs = []
  296.             for (test, jobid, printer, doc, status) in jobs:
  297.                 attrs = None
  298.                 if test:
  299.                     try:
  300.                         attrs = c.getJobAttributes (jobid)
  301.                     except AttributeError:
  302.                         # getJobAttributes was introduced in pycups 1.9.35.
  303.                         if job_attrs == None:
  304.                             job_attrs = c.getJobs (which_jobs='all')
  305.  
  306.                         attrs = self.job_attrs[jobid]
  307.  
  308.                 with_attrs.append ((test, jobid, printer, doc, status, attrs))
  309.  
  310.             return with_attrs
  311.  
  312.         self.op = TimedOperation (collect_attributes,
  313.                                   (jobs,),
  314.                                   parent=parent)
  315.         try:
  316.             with_attrs = self.op.run ()
  317.             self.answers['test_page_job_status'] = with_attrs
  318.         except OperationCanceled:
  319.             pass
  320.  
  321.         return self.answers
  322.  
  323.     def cancel_operation (self):
  324.         self.op.cancel ()
  325.  
  326.         # Abandon the CUPS connection and make another.
  327.         answers = self.troubleshooter.answers
  328.         factory = answers['_authenticated_connection_factory']
  329.         self.authconn = factory.get_connection ()
  330.         self.answers['_authenticated_connection'] = self.authconn
  331.  
  332.     def handle_dbus_signal (self, *args):
  333.         debugprint ("D-Bus signal caught: updating jobs list soon")
  334.         gobject.source_remove (self.timer)
  335.         self.timer = gobject.timeout_add (200, self.update_jobs_list)
  336.  
  337.     def update_job (self, jobid, job_dict):
  338.         iter = self.job_to_iter[jobid]
  339.         model = self.treeview.get_model ()
  340.         try:
  341.             printer_name = job_dict['printer-name']
  342.         except KeyError:
  343.             try:
  344.                 uri = job_dict['job-printer-uri']
  345.                 r = uri.rfind ('/')
  346.                 printer_name = uri[r + 1:]
  347.             except KeyError:
  348.                 printer_name = None
  349.  
  350.         if printer_name != None:
  351.             model.set_value (iter, 2, printer_name)
  352.  
  353.         model.set_value (iter, 3, job_dict['job-name'])
  354.         model.set_value (iter, 4, self.STATE[job_dict['job-state']])
  355.  
  356.     def print_clicked (self, widget):
  357.         now = time.time ()
  358.         tt = time.localtime (now)
  359.         when = time.strftime ("%d/%b/%Y:%T %z", tt)
  360.         self.persistent_answers['test_page_attempted'] = when
  361.         answers = self.troubleshooter.answers
  362.         parent = self.troubleshooter.get_window ()
  363.  
  364.         def print_test_page (*args, **kwargs):
  365.             factory = answers['_authenticated_connection_factory']
  366.             c = factory.get_connection ()
  367.             return c.printTestPage (*args, **kwargs)
  368.  
  369.         tmpfname = None
  370.         mimetypes = [None, 'text/plain']
  371.         for mimetype in mimetypes:
  372.             try:
  373.                 if mimetype == None:
  374.                     # Default test page.
  375.                     self.op = TimedOperation (print_test_page,
  376.                                               (answers['cups_queue'],),
  377.                                               parent=parent)
  378.                     jobid = self.op.run ()
  379.                 elif mimetype == 'text/plain':
  380.                     (tmpfd, tmpfname) = tempfile.mkstemp ()
  381.                     os.write (tmpfd, "This is a test page.\n")
  382.                     os.close (tmpfd)
  383.                     self.op = TimedOperation (print_test_page,
  384.                                               (answers['cups_queue'],),
  385.                                               kwargs={'file': tmpfname,
  386.                                                       'format': mimetype},
  387.                                               parent=parent)
  388.                     jobid = self.op.run ()
  389.                     try:
  390.                         os.unlink (tmpfname)
  391.                     except OSError:
  392.                         pass
  393.  
  394.                     tmpfname = None
  395.  
  396.                 jobs = self.persistent_answers.get ('test_page_job_id', [])
  397.                 jobs.append (jobid)
  398.                 self.persistent_answers['test_page_job_id'] = jobs
  399.                 break
  400.             except OperationCanceled:
  401.                 self.persistent_answers['test_page_submit_failure'] = 'cancel'
  402.                 break
  403.             except RuntimeError:
  404.                 self.persistent_answers['test_page_submit_failure'] = 'connect'
  405.                 break
  406.             except cups.IPPError, (e, s):
  407.                 if (e == cups.IPP_DOCUMENT_FORMAT and
  408.                     mimetypes.index (mimetype) < (len (mimetypes) - 1)):
  409.                     # Try next format.
  410.                     if tmpfname != None:
  411.                         os.unlink (tmpfname)
  412.                         tmpfname = None
  413.                     continue
  414.  
  415.                 self.persistent_answers['test_page_submit_failure'] = (e, s)
  416.                 show_error_dialog (_("Error submitting test page"),
  417.                                    _("There was an error during the CUPS "
  418.                                      "operation: '%s'.") % s,
  419.                                    self.troubleshooter.get_window ())
  420.                 break
  421.  
  422.     def cancel_clicked (self, widget):
  423.         self.persistent_answers['test_page_jobs_cancelled'] = True
  424.         jobids = []
  425.         for jobid, iter in self.job_to_iter.iteritems ():
  426.             jobids.append (jobid)
  427.  
  428.         def cancel_jobs (jobids):
  429.             c = self.authconn
  430.             for jobid in jobids:
  431.                 try:
  432.                     c.cancelJob (jobid)
  433.                 except cups.IPPError, (e, s):
  434.                     if e != cups.IPP_NOT_POSSIBLE:
  435.                         self.persistent_answers['test_page_cancel_failure'] = (e, s)
  436.  
  437.         self.op = TimedOperation (cancel_jobs,
  438.                                   (jobids,),
  439.                                   parent=self.troubleshooter.get_window ())
  440.         try:
  441.             self.op.run ()
  442.         except OperationCanceled:
  443.             pass
  444.  
  445.     def test_toggled (self, cell, path):
  446.         model = self.treeview.get_model ()
  447.         iter = model.get_iter (path)
  448.         active = model.get_value (iter, 0)
  449.         model.set_value (iter, 0, not active)
  450.  
  451.     def update_jobs_list (self):
  452.         def get_notifications (self):
  453.             c = self.authconn
  454.             try:
  455.                 notifications = c.getNotifications ([self.sub_id],
  456.                                                     [self.sub_seq + 1])
  457.             except AttributeError:
  458.                 notifications = c.getNotifications ([self.sub_id])
  459.  
  460.             return notifications
  461.  
  462.         # Enter the GDK lock.  We need to do this because we were
  463.         # called from a timeout.
  464.         gtk.gdk.threads_enter ()
  465.  
  466.         parent = self.troubleshooter.get_window ()
  467.         self.op = TimedOperation (get_notifications,
  468.                                   (self,),
  469.                                   parent=parent)
  470.         try:
  471.             notifications = self.op.run ()
  472.         except OperationCanceled:
  473.             gtk.gdk.threads_leave ()
  474.             return True
  475.  
  476.         answers = self.troubleshooter.answers
  477.         model = self.treeview.get_model ()
  478.         queue = answers['cups_queue']
  479.         test_jobs = self.persistent_answers.get('test_page_job_id', [])
  480.         for event in notifications['events']:
  481.             seq = event['notify-sequence-number']
  482.             try:
  483.                 if seq <= self.sub_seq:
  484.                     # Work around a bug in pycups < 1.9.34
  485.                     continue
  486.             except AttributeError:
  487.                 pass
  488.             self.sub_seq = seq
  489.             job = event['notify-job-id']
  490.  
  491.             nse = event['notify-subscribed-event']
  492.             if nse == 'job-created':
  493.                 if (job in test_jobs or
  494.                     event['printer-name'] == queue):
  495.                     iter = model.append (None)
  496.                     self.job_to_iter[job] = iter
  497.                     model.set_value (iter, 0, True)
  498.                     model.set_value (iter, 1, job)
  499.                 else:
  500.                     continue
  501.             elif not self.job_to_iter.has_key (job):
  502.                 continue
  503.  
  504.             if (job in test_jobs and
  505.                 nse in ["job-stopped", "job-completed"]):
  506.                 comp = self.persistent_answers.get ('test_page_completions', [])
  507.                 comp.append ((job, event['notify-text']))
  508.                 self.persistent_answers['test_page_completions'] = comp
  509.  
  510.             self.update_job (job, event)
  511.  
  512.         # Update again when we're told to. (But we might update sooner if
  513.         # there is a D-Bus signal.)
  514.         gobject.source_remove (self.timer)
  515.         self.timer = gobject.timeout_add (1000 *
  516.                                           notifications['notify-get-interval'],
  517.                                           self.update_jobs_list)
  518.         debugprint ("Update again in %ds" %
  519.                     notifications['notify-get-interval'])
  520.         gtk.gdk.threads_leave ()
  521.         return False
  522.